package com.vf.cloud.server.colony.handler;

import java.util.Map;
import com.alibaba.fastjson.JSONObject;
import com.jfinal.kit.StrKit;
import com.vf.cloud.common.domain.Config;
import com.vf.cloud.common.util.RequestParser;
import com.vf.cloud.server.colony.MasterServer;
import com.vf.cloud.server.colony.pool.NodePool;
import com.vf.cloud.server.colony.util.ColonyUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MasterChannelInboundHandlerHandler extends SimpleChannelInboundHandler<Object> {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {

		if (2 == MasterServer.getInstance().getStatus()) {
			log.info(String.format("Node is connected:" + ctx.channel().remoteAddress()));
		} else {
			log.info(String.format("Node : " + MasterServer.getInstance().getStatus() + ",Now close Channel:"
					+ ctx.channel().remoteAddress()));
			ctx.close();
		}

	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		ColonyUtil.disconnected(ctx.channel().id().asLongText());
	}

	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.channel().flush();
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
		try {

			if (2 == MasterServer.getInstance().getStatus()) {
				if (msg instanceof FullHttpRequest) {
					handleHttpRequest(ctx, (FullHttpRequest) msg);
				} else {
					handlerWebSocketFrame(ctx, msg);
				}
			}

		} finally {
		}
	}

	WebSocketServerHandshaker handshaker;

	/**
	 * 处理 FullHttpRequest
	 * 
	 * @param ctx
	 * @param request
	 */
	protected void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
		if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
			sendHttpResponse(ctx, req,
					new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
			return;
		}

		String uri = req.uri();
		WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
				"ws://" + req.headers().get(HttpHeaderNames.HOST) + uri, null, false);
		handshaker = wsFactory.newHandshaker(req);
		if (handshaker == null) {
			WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
		} else {
			handshaker.handshake(ctx.channel(), req);
		}

		if (uri.contains("?")) {
			log.info("Node comming!");
			try {
				Map<String, String> param = new RequestParser(req).parse();
				if (!param.containsKey("key")) {
					log.error("口令缺失.");
					ctx.close();
					return;
				}
				String key = param.get("key");

				Config config = Config.dao.findFirst("select * from " + Config.TABLE_NAME + " where local_pwd=? ", key);
				if (config == null) {
					log.error("口令无效:%s", key);
					ctx.close();
					return;
				}
				NodePool.addNode(key, ctx);
			} catch (Exception e) {
				log.error("非法节点.", e);
				ctx.close();
			}
		} else {
			log.error("非法节点.");
			ctx.close();
		}

	}

	public static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
		if (res.status().code() != 200) {
			ByteBuf buf = Unpooled.copiedBuffer(res.status().code() + "", CharsetUtil.UTF_8);
			res.content().writeBytes(buf);
			buf.release();
		}
		// 如果是非Keep-Alive，关闭连接
		if (!HttpUtil.isKeepAlive((HttpMessage) req.protocolVersion()) || res.status().code() != 200) {
			ctx.channel().writeAndFlush(res).addListener(ChannelFutureListener.CLOSE);
		}
	}

	private void handlerWebSocketFrame(ChannelHandlerContext ctx, Object frame) {
		// 判断是否关闭链路的指令
		if (frame instanceof CloseWebSocketFrame) {
			CloseWebSocketFrame closeWebSocketFrame = (CloseWebSocketFrame) frame;
			handshaker.close(ctx.channel(), (CloseWebSocketFrame) closeWebSocketFrame.retain());
			return;
		}
		// 判断是否ping消息
		if (frame instanceof PingWebSocketFrame) {
			return;
		}

		if (frame instanceof TextWebSocketFrame) {

			TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) frame;
			JSONObject rawMsg = null;
			try {
				rawMsg = JSONObject.parseObject(textWebSocketFrame.text());
			} catch (Exception e) {
				log.error(String.format("[%s]>> cannot parse Node message:%s", 1008, e));
				ctx.close();
			}
			if (rawMsg == null) {
				return;
			}

			if(!rawMsg.containsKey("type")) {
				log.error("Invalid node message:"+textWebSocketFrame.text());
				return;
			}

			//log.info(String.format(" <- Node【%s】: %s", ctx.channel().remoteAddress(), textWebSocketFrame.text()));
			String type = rawMsg.getString("type");
			if (StrKit.equals(type, "ping")) {
				ColonyUtil.sendPing(ctx, rawMsg.getLong("time"));
				return;
			}


			if (!rawMsg.containsKey("data") ) {
				log.error("Invalid node message. ");
				return;
			}

			if (StrKit.equals(type, "GPU_RESOURCE")) {
				ColonyUtil.handleGPUState(ctx, rawMsg);
			} else if (StrKit.equals(type, "disconnectPlayer")) {
			} else {
				log.error("unsupported Node message type:%s", type);
				ctx.close();
			}

		}
	}

	/**
	 * 向客户端发送心跳
	 * 
	 * @param ctx
	 * @param evt
	 * @throws Exception
	 */
	@Override
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt;
			if (event.state().equals(IdleState.READER_IDLE)) {
				ColonyUtil.sendPing(ctx, System.currentTimeMillis());
			} else if (event.state().equals(IdleState.WRITER_IDLE)) {
				ColonyUtil.sendPing(ctx, System.currentTimeMillis());
			} else if (event.state().equals(IdleState.ALL_IDLE)) {
				ColonyUtil.sendPing(ctx, System.currentTimeMillis());
			}
		}
	}

}
